{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Amazon Kinesis Data Firehose\n",
    "\n",
    "Amazon Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3, Amazon Redshift, Amazon Elasticsearch Service (Amazon ES), Splunk, and any custom HTTP endpoint. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"img/kinesis_firehose_transform.png\" width=\"90%\" align=\"left\">"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "import sagemaker\n",
    "import pandas as pd\n",
    "import json\n",
    "\n",
    "sess = sagemaker.Session()\n",
    "bucket = sess.default_bucket()\n",
    "role = sagemaker.get_execution_role()\n",
    "region = boto3.Session().region_name\n",
    "\n",
    "sm = boto3.Session().client(service_name=\"sagemaker\", region_name=region)\n",
    "firehose = boto3.Session().client(service_name=\"firehose\", region_name=region)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r firehose_name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    firehose_name\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run all previous notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(firehose_name)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Check IAM Roles Are In Place"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r iam_kinesis_role_name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    iam_kinesis_role_name\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run all previous notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(iam_kinesis_role_name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r iam_role_kinesis_arn"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    iam_role_kinesis_arn\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run all previous notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(iam_role_kinesis_arn)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r iam_kinesis_role_passed"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    iam_kinesis_role_passed\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run all previous notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(iam_kinesis_role_passed)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if not iam_kinesis_role_passed:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run all previous notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "else:\n",
    "    print(\"[OK]\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Retrieve Lambda ARN"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r lambda_fn_arn_invoke_ep"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    lambda_fn_arn_invoke_ep\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run all previous notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(lambda_fn_arn_invoke_ep)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create a Kinesis Data Firehose Delivery Stream"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from botocore.exceptions import ClientError\n",
    "\n",
    "try:\n",
    "    response = firehose.create_delivery_stream(\n",
    "        DeliveryStreamName=firehose_name,\n",
    "        DeliveryStreamType=\"DirectPut\",\n",
    "        ExtendedS3DestinationConfiguration={\n",
    "            \"RoleARN\": iam_role_kinesis_arn,\n",
    "            \"BucketARN\": \"arn:aws:s3:::{}\".format(bucket),\n",
    "            \"Prefix\": \"kinesis-data-firehose/\",\n",
    "            \"ErrorOutputPrefix\": \"kinesis-data-firehose-error/\",\n",
    "            \"BufferingHints\": {\"SizeInMBs\": 1, \"IntervalInSeconds\": 60},\n",
    "            \"CompressionFormat\": \"UNCOMPRESSED\",\n",
    "            \"CloudWatchLoggingOptions\": {\n",
    "                \"Enabled\": True,\n",
    "                \"LogGroupName\": \"/aws/kinesisfirehose/dsoaws-kinesis-data-firehose\",\n",
    "                \"LogStreamName\": \"S3Delivery\",\n",
    "            },\n",
    "            \"ProcessingConfiguration\": {\n",
    "                \"Enabled\": True,\n",
    "                \"Processors\": [\n",
    "                    {\n",
    "                        \"Type\": \"Lambda\",\n",
    "                        \"Parameters\": [\n",
    "                            {\n",
    "                                \"ParameterName\": \"LambdaArn\",\n",
    "                                \"ParameterValue\": \"{}:$LATEST\".format(lambda_fn_arn_invoke_ep),\n",
    "                            },\n",
    "                            {\"ParameterName\": \"BufferSizeInMBs\", \"ParameterValue\": \"1\"},\n",
    "                            {\"ParameterName\": \"BufferIntervalInSeconds\", \"ParameterValue\": \"60\"},\n",
    "                        ],\n",
    "                    }\n",
    "                ],\n",
    "            },\n",
    "            \"S3BackupMode\": \"Enabled\",\n",
    "            \"S3BackupConfiguration\": {\n",
    "                \"RoleARN\": iam_role_kinesis_arn,\n",
    "                \"BucketARN\": \"arn:aws:s3:::{}\".format(bucket),\n",
    "                \"Prefix\": \"kinesis-data-firehose-source-record/\",\n",
    "                \"ErrorOutputPrefix\": \"!{firehose:error-output-type}/\",\n",
    "                \"BufferingHints\": {\"SizeInMBs\": 1, \"IntervalInSeconds\": 60},\n",
    "                \"CompressionFormat\": \"UNCOMPRESSED\",\n",
    "            },\n",
    "            \"CloudWatchLoggingOptions\": {\n",
    "                \"Enabled\": False,\n",
    "            },\n",
    "        },\n",
    "    )\n",
    "    print(\"Delivery stream {} successfully created.\".format(firehose_name))\n",
    "    print(json.dumps(response, indent=4, sort_keys=True, default=str))\n",
    "except ClientError as e:\n",
    "    if e.response[\"Error\"][\"Code\"] == \"ResourceInUseException\":\n",
    "        print(\"Delivery stream {} already exists.\".format(firehose_name))\n",
    "    else:\n",
    "        print(\"Unexpected error: %s\" % e)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## _This may take 1-2 minutes.  Please be patient._"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "\n",
    "status = \"\"\n",
    "while status != \"ACTIVE\":\n",
    "    r = firehose.describe_delivery_stream(DeliveryStreamName=firehose_name)\n",
    "    description = r.get(\"DeliveryStreamDescription\")\n",
    "    status = description.get(\"DeliveryStreamStatus\")\n",
    "    time.sleep(5)\n",
    "\n",
    "print(\"Delivery Stream {} is active\".format(firehose_name))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "firehose_arn = r[\"DeliveryStreamDescription\"][\"DeliveryStreamARN\"]\n",
    "print(firehose_arn)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store firehose_arn"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Review Kinesis Data Firehose Delivery Stream"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(\n",
    "    HTML(\n",
    "        '<b>Review <a target=\"blank\" href=\"https://console.aws.amazon.com/firehose/home?region={}#/details/{}/details\"> Firehose</a></b>'.format(\n",
    "            region, firehose_name\n",
    "        )\n",
    "    )\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Store Variables for the Next Notebooks"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Release Resources"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%html\n",
    "\n",
    "<p><b>Shutting down your kernel for this notebook to release resources.</b></p>\n",
    "<button class=\"sm-command-button\" data-commandlinker-command=\"kernelmenu:shutdown\" style=\"display:none;\">Shutdown Kernel</button>\n",
    "        \n",
    "<script>\n",
    "try {\n",
    "    els = document.getElementsByClassName(\"sm-command-button\");\n",
    "    els[0].click();\n",
    "}\n",
    "catch(err) {\n",
    "    // NoOp\n",
    "}    \n",
    "</script>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%javascript\n",
    "\n",
    "try {\n",
    "    Jupyter.notebook.save_checkpoint();\n",
    "    Jupyter.notebook.session.delete();\n",
    "}\n",
    "catch(err) {\n",
    "    // NoOp\n",
    "}"
   ]
  }
 ],
 "metadata": {
  "instance_type": "ml.t3.medium",
  "kernelspec": {
   "display_name": "Python 3 (Data Science)",
   "language": "python",
   "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
